package com.wudl.hudi.sink;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author ：wudl
 * @date ：Created in 2022-02-20 1:05
 * @description：
 * @modified By：
 * @version: 1.0
 */

public class kafkaSinkafka {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.executeSql(
                "CREATE TABLE order_kafka_source (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'hudiflink',\n" +
                        "  'properties.bootstrap.servers' = '192.168.1.161:6667',\n" +
                        "  'properties.group.id' = 'gid-1002',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );

        tableEnv.executeSql(
                "CREATE TABLE order_kafka_sink (\n" +
                        "  orderId STRING,\n" +
                        "  userId STRING,\n" +
                        "  orderTime STRING,\n" +
                        "  ip STRING,\n" +
                        "  orderMoney DOUBLE,\n" +
                        "  orderStatus INT\n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'wudltopicdoris01',\n" +
                        "  'properties.bootstrap.servers' = '192.168.1.161:6667',\n" +
                        "  'properties.group.id' = 'gid-1002',\n" +
                        "  'scan.startup.mode' = 'latest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.fail-on-missing-field' = 'false',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );



        tableEnv.executeSql("insert into order_kafka_sink  select * from  order_kafka_source");
    }
}
